细读经典 您所在的位置:网站首页 rocketmq ttl 细读经典

细读经典

2023-08-28 01:05| 来源: 网络整理| 查看: 265

二、消息发送

MQ发送普通消息有三种方式:可靠同步、可靠异步、单向,可靠的意思是会收到发送的结果

基于以上认识,我们可以看看一条消息发送的整个过程是怎样的。

2.1 抽象部分:

这里很自然能想到需要定义消息实体Message::

public class Message implements Serializable { // 主题 private String topic; // 保留 private int flag; // 扩展属性 // 包含消息TAG,消息索引keys,消息延迟级别delayTimeLevel private Map properties; // 消息体 private byte[] body; // 事务id private String transactionId; }

接下来思路就是构建生产者,消费者模型,RocketMQ给出了一个父类MQClientInstance用来标识一个生产者或者消费者实例,MQClientInstance由MQClientManager维护的一个ConcurrentMap进行管理,对于一个clientId,在一个JVM实例中,仅有一个MQClientInstance

public class MQClientManager { private final static InternalLogger log = ClientLogger.getLog(); private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); private ConcurrentMap factoryTable = new ConcurrentHashMap(); } 2.2 实现部分:

对于生产者而言,这里RocketMQ给出了一个默认实现DefaultMQProducer,目前这个方法已经比较臃肿了,直接通过核心方法start()进行跟踪:

public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } // 创建MQClientInstance实例 this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 向生产者组中注册自己, boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } //向broker广播自己的ClientID this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { RequestFutureTable.scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } } }, 1000 * 3, 1000); }

start()方法的主要功能就是启动一个MQClient的生产者实现,并保存它的信息,方便后续调用。

在拥有了一个生产者实例之后,我们就可以通过tcp连接将消息从生产者发送到MQ的Broker中,MQ源码中提供了简单消息的发送example:

package org.apache.rocketmq.example.simple; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; public class TestProducer { public static void main(String[] args) throws MQClientException, InterruptedException { // 创建MQclient实例 DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 1; i++){ try { Message msg = new Message("TopicTest1", "TagA", "key113", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 同步方法,阻塞 SendResult sendResult = producer.send(msg); // 单向 producer.sendOneway(msg); // 异步回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("发送成功"); } @Override public void onException(Throwable e) { System.out.println("发送失败"); } }); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }

方法都很好理解,我们看一下返回信息封装SendResult里面的内容:

基于之前的知识,如果要我们自己去写一个返回的结果,应该包括发送状态,消息本身的标识,消息发送到哪个broker上具体什么地方(消费者根据主题去消费,生产者主题在msg里,这里的返回就是最主要的就是标识状态为了排查),基本上SendResult就是这些结果,注意异步和同步方法都有返回SendResult,你可以根据sendResult记录日志。

public class SendResult { private SendStatus sendStatus; private String msgId; private MessageQueue messageQueue; private long queueOffset; private String transactionId; private String offsetMsgId; private String regionId; }

在关注完发送的主题流程之后,我们进入生产者的send方法,看看到底send过程中有什么细节:

send方法底层有一个默认实现SendDefaultImpl,该方法很长,我只提取三个主要的方法:

private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ){ // 消息验证 Validators.checkMessage(msg, this.defaultMQProducer); // 路由查找 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 消息发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); }

消息发送的基本流程包括:消息验证,路由查找和消息发送

消息验证主要是验证消息长度,源码默认长度1024*1024*4为4MB

路由查找则是根据topic找broker和broker下的queue,找到要发送的queue之后,就是向该queue发送msg,这里面选择queue有不同的策略,你应该瞬间有反应策略模式,默认就是轮询的向broker内该topic下的所有queue发送msg。选择queue的源码:

public MessageQueue selectOneMessageQueue() { // 上一次发送的位置自增 int index = this.sendWhichQueue.getAndIncrement(); // 取模 int pos = Math.abs(index) % this.messageQueueList.size(); // pos小于零就取缓存的queue中的第一个 if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }

选择完queue之后开始消息发送,根据路由信息组装报文,然后根据不同的CommunicationMode(同步,异步,单向)执行不同的方法,当然底层都是一个tcp长连接,通过Channel去发送数据:底层源码也看看,当然,单向,异步不用返回,直接返回null就行,等待发送完之后的处理。

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } 2.3 异常情况

发送消息过程中的异常处理其实有很多,这里我们着重看一下broker延迟机制,问题的由来是:

如果一个queue所在的broker宕机了,那么你selectOneMessageQueue方法自增取模,取到的下一个很有可能还是宕机的broker所拥有的queue,说白了这是无用功,怎么避免呢?

源码中提供了selectOneMessageQueue的另一种实现:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }

解决的思路就是利用一张CocurrentHashmap存储存在发送问题的broker,让其在一段时间内无法发送,如果过了这段时间,broker和producer的通信结果表明该broker可用,那就再次向该broker尝试发送msg,如果borker可用,就移除该broker的错误标志,有兴趣的可以深入了解一下,我就不展开了。

以上就是发送一个简单消息的全部过程。

2.4 批量消息

当然,RocketMQ也支持批量发送,将msg封装成MessageBatch,其实就是为List提供了encode方法用于压缩msg的内容:

public class MessageBatch extends Message implements Iterable { //messagebatch实际上就是message的list private final List messages; public byte[] encode() { //批处理固定格式化 return MessageDecoder.encodeMessages(messages); } public Iterator iterator() { return messages.iterator(); } }

其中,很优美的实现了一个迭代器模式

一般而言,批量消息发送的策略包括固定格式,固定长度,固定字符结尾等等,RocketMQ采用的是固定格式:

 具体发送过程和发送单条消息没有区别,不在赘述。

生产者端消息发送过程就结束了,架构上没有太多难点,难的反而是高并发编程,这里面用到并发容器很多,有状态服务的维护,netty的网络通信,才是系统实现的难点所在。

今天先到这吧

 

 



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有